package com.jhhe.homework4_2

import org.apache.spark.{SparkConf, SparkContext}

object ClickLogs {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("ClickImpLog").setMaster("local")
    val sc = new SparkContext(conf)
    sc.setLogLevel("warn")

    val clickLog = sc.textFile("data/click.log")
    val impLog = sc.textFile("data/imp.log")

    val clkRDD = clickLog.map { line =>
      val arr = line.split("\\s+")
      val adid = arr(3).substring(arr(3).lastIndexOf("=") + 1)
      (adid, 1)
    }.reduceByKey(_ + _)

    val impRDD = impLog.map { line =>
      val arr = line.split("\\s+")
      val adid = arr(3).substring(arr(3).lastIndexOf("=") + 1)
      (adid, 1)
    }.reduceByKey(_ + _)

    // 保存文件到hdfs
    clkRDD.fullOuterJoin(impRDD)
      .map(x => x._1 + "," + x._2._1.getOrElse(0) + "," + x._2._2.getOrElse(0))
      .saveAsTextFile("data/")

    sc.stop()
  }
}
